Adaptive Sampling
Contents
Adaptive Sampling#
In this tutorial you will learn:
Basic acquisition using the BlueSky RunEngine
Generating basic acquisition plans for multiple sample environments on a simulated diffraction beamline
How to use the Bluesky Adaptive harness to readily integrate AI-agents with the beamline
Demonstration of Reinforcement Learning (RL) being used to optimize data collection strategies
For more bluesky tutorials, go to https://blueskyproject.io/tutorials.
Some of the software we use:#
Bluesky RunEngine for experiment orchestration (sequencing)
Bluesky Ophyd for device integration (for this demo, a simulated detector)
Bluesky Widgets components for live-updating (“streaming”) visualization
Matplotlib for visualization
Bluesky Adaptive, an adaptive “harness” for integrating an Agent in a feedback loop with the Bluesky RunEngine
Tensorflow for the model
%matplotlib widget
import matplotlib.pyplot as plt
from bluesky import RunEngine
from bluesky.plans import count
from utils.simulated_hardware import detector, sample_selector, select_sample
from utils.visualization import stream_to_figures
from utils.adaptive_recommendations import with_agent
detector.delay = 1
Make a “RunEngine”#
The RunEngine is responsible for orchestrating the operation of the beamline. It
Processes a set of instructions (a “plan”) from the user
Direct hardware, tracks what is moving when, and tries to clean up correct in the event of success or failures
Emits metadata and data in a streaming fashion for consumers (plots, models, storage, etc.)
RE = RunEngine()
We can now use RunEngine to move things and it collect data.
Switch between samples#
For this tutorial, we will be moving a sample_selector
ophyd device, which can switch between samples on our simulated beamline. We can read the current status of the motor to see which sample we are currently on.
sample_selector.read()
{'sample_selector': {'value': 0, 'timestamp': 1682197314.1015368}}
To move this motor, we could use the built-in bluesky mv
plan…
from bluesky.plan_stubs import mv
RE(mv(sample_selector, 2))
sample_selector.read()
{'sample_selector': {'value': 2, 'timestamp': 1682197314.7594862}}
Or, we can write our own custom plan with whatever language and extensions we wish.
def select_sample(sample_number, *, verbose=False):
if verbose:
print(f"Moving to sample {sample_number}")
yield from mv(sample_selector, sample_number)
RE(select_sample(0, verbose=True)) # This moves a sample positioner to place Sample 0 in the beam.
Moving to sample 0
()
Acquire images#
In this tutorial, we are focused on live streaming data as published by the RunEngine and formatted in the event-model. As such, we’re going to setup our visualization first, and then stream data in as we are measuring.
Functionally, this involves creating a figure and axes with Matplotlib, and then wrapping them in a callback function. The callback function will be streamed the documents as they are emitted by the RunEngine, unpack the documents, and update the visualization.
When the cell below is first run, you should see a checkerboard pattern and some basic metadata in the title (sample number, and number of shots on the sample).
fig, axes = plt.subplots(squeeze=False, constrained_layout=True, figsize=(5, 5))
callback = stream_to_figures(fig, axes)
Now we can select the sample we want and pass a count
plan into the run engine.
count
is the most basic acquisition plan in Bluesky. It takes as an argument a list of detectors. In this case, we will pass it both the detector
and the sample_selector
ophyd objects. In this way, the emitted documents will automatically contain both the image data from the detector and the sample number associated with this data (important metadata).
RE(select_sample(0))
()
RE(count([sample_selector, detector]), callback)
('6ed1da25-a9a4-4d68-8bc0-b6265f1e249c',)
If everything worked, you should have seen a diffraction pattern appear on the visualization, with the associated sample number (“Sample 0”) in the image title. This data is pulled from the streaming documents emitted by the RunEngine - we didn’t have to go get them and fully process them later!
Note that if you look at the immidiate output of the cell, you should see a long-string of letters and numbers. This serves as a universally unique identifier (uuid) for that particular measurment that will never be repeated. These uuids can be used as book-keeping identifiers for later data lookup, but that is beyond the scope of this tutorial.
So, what happens if we measure the same sample again?
RE(count([sample_selector, detector]), callback)
('361009de-cecb-472a-941c-5f329b1e1750',)
The stream_to_figures
callback we have setup here contains logic to average additional measurements of the same sample together. As such, you may have noticed that the image quality improved slightly when this second measurement was performed. The “N_shots” quantity in the plot title also reflects this number of shots on sample increasing.
You can try re-running that cell several times if you’d like, but we can expect little visual change. This is because sample 0 happens to be a ‘strong’ scatterer, and it’s resultant signal-to-noise quality is good over the background. In fact, even a single exposure produced an image with sufficient contrast to interpret scientifically.
Next, let’s set up a new figure, move to the next sample, and take a measurement.
fig, axes = plt.subplots(squeeze=False, constrained_layout=True, figsize=(5, 5))
callback = stream_to_figures(fig, axes, start_at=1)
RE(select_sample(1, verbose=True))
Moving to sample 1
()
RE(count([sample_selector, detector]), callback)
('a30a8387-8f06-4e02-9f08-8a6f0582c1e3',)
This sample doesn’t look nearly as good as the first one! It gives a weak signal (low signal-to-noise ratio). A single exposure does not produce sufficient contrast, but we can take additional exposures. The visualization above will display the average of all the exposures of this sample.
We could do that by hand, re-running the above cell several times. Or, we could write a custom plan to do this. Let’s go with the latter as it’s less tedious to manage. Plus, we can take advantage of the ability to use plans in other plans!
def multi_count(num_counts):
for i in range(num_counts):
print(f"Exposure number {i}")
yield from count([sample_selector, detector])
Note that we didn’t have to know anything about how the guts of count
worked, and we even made the decision to hardcode detector
and sample_selector
into the plan (though we wouldn’t need to, it’s up to how you want to write the plan).
unique_ids = RE(multi_count(5), callback)
Exposure number 0
Exposure number 1
Exposure number 2
Exposure number 3
Exposure number 4
Now the statistics on that weakly-scattering sample are starting to look better!
Acquire images for more than one sample#
We’ve already seen that samples 0 and 1 represent a strong and weakly scatterer respectively. Let’s say that based off these initial observations, we decide that the poorly-scattering sample need to be measured by some ratio (e.g. 4x) as often as good scattering samples. We probably also want to be able to define how many total loops to complete over both samples at the desired measurement ratio.
Let’s write a plan!
def multi_sample_plan(measure_ratio, total_loops=1):
for i in range(total_loops):
print(f"* Now on big loop {i}")
yield from select_sample(0, verbose=True)
yield from count([sample_selector, detector])
yield from select_sample(1, verbose=True)
for _ in range(measure_ratio):
yield from count([sample_selector, detector])
To make the visualization easier, let’s setup a callback that can display both samples at once as the data is streamed.
fig, axes = plt.subplots(1, 2, squeeze=False, constrained_layout=True, figsize=(5, 3))
callback = stream_to_figures(fig, axes)
unique_ids = RE(multi_sample_plan(4, total_loops=2), callback)
* Now on big loop 0
Moving to sample 0
Moving to sample 1
* Now on big loop 1
Moving to sample 0
Moving to sample 1
Dealing with many, many samples.#
We have thus far been playing with just a couple of samples. However, on the real PDF beamline at NSLS-II, we often load hundreds of different samples at one time. And each of these samples could have very different scattering qualities!
let’s write a custom plan to ‘sweep’ all samples on the simulated bracket. For this example, we’ll pretend to have 9-samples. We would like our plan to define the total number of individual shots to be taken.
def sequential_sweep(total_shots):
"Sweep over the samples in order. Take up to `total_shots` shots."
for shot in range(total_shots):
yield from select_sample(shot % 9)
yield from count([sample_selector, detector])
fig, axes = plt.subplots(3, 3, constrained_layout=True, figsize=(5, 5))
callback = stream_to_figures(fig, axes)
Our simulated detector has a simulated “delay”, standing in for the exposure and readout time in a real detector. Here we’ll speed it up a bit just so the following examples runs faster.
detector.delay = 0.2
unique_ids = RE(sequential_sweep(total_shots=25), callback)
You no doubt have a combination of strong and weakly scattering samples. Note that this arrangment is random, and if you restart this Notebook, will likely be different. You could sit down and write a custom plan, based on the scattering strength of your unique simulated samples. However, this would get to be tedious work if performed more than once or twice. This is where AI can help us out!
Use bluesky-adaptive
to let an Agent drive the experiment#
Here, we introduce the bluesky-adaptive
plan. This is a plan designed to readily interface with any sort of decision-making agent via an ask/tell interface. More details of this plan design and implementation can be found at https://blueskyproject.io/bluesky-adaptive/
We will take advantage of a plan here called with_agent
which simplifies the interface here, but you can easily look at the details by digging into with_agent??
Agent 1: Naive Agent#
To begin with, let’s use a naive sequential agent. The goal of this agent is to, within a defined “budget” of time (i.e. scans), sequentially measure across each sample. This is effectively exactly what our sequential_sweep
plan above performed, but in the language of an agent in the bluesky-adaptive
machinery.
fig, axes = plt.subplots(3, 3, constrained_layout=True, figsize=(5, 5))
callback = stream_to_figures(fig, axes)
from utils.adaptive_recommendations import NaiveAgent
NaiveAgent?
unique_ids = RE(
with_agent(NaiveAgent(9), max_shots=30),
callback,
)
This works well functionally, but it’s not a very efficient use of our time. After all, we would really prefer to measure the weaker-scatterers many more times than the strong scatterers, in order to built up better statistics. However, hand-writing a plan for every set of samples is not tractable. Let’s here use an agent trained using Reinforcement Learning to accomplish this task.
Agent 2: Reinforcement Learning Agent#
The details of how Reinforcement Learning (RL) works, or how we implemented it are beyond the scope of this short tutorial. However, interested readers can find details of the method here https://iopscience.iop.org/article/10.1088/2632-2153/abc9fc. Essentially, the language of RL allows us to easily “gamify” our scientific goals.
Prior to this tutorial, we trained an RL agent to develop an optimal policy that would enable for the following goals:
1.) Measure each sample initially once
2.) Repeat measurements on the samples, but collecting on weak scatterers 9x, and strong 1x.
3.) Having now built up sufficient statistics on the weak scatterers, continue to measuring each sample sequentially.
fig, axes = plt.subplots(3, 3, constrained_layout=True, figsize=(5, 5))
callback = stream_to_figures(fig, axes)
Note: Here we replace the RL agent with a lightweight Monte Carlo approximation. This enables this tutorial to run quickly, overcomes challenges with outdated versioning of the agent, and retains the Tensorforce agent code in this repository as an example.
Nonetheless, from the standpoint of interfacing with Bluesky, we are again using the Bluesky-adaptive
machinery and the with_agent
helper plan.
from utils.adaptive_recommendations import RLAgent
detector.delay = 1
unique_ids = RE(
with_agent(RLAgent(9, "tf_models/bluesky-tutorial/saved_models"), max_shots=90),
callback,
)
Through the use of an RL agent, and Bluesky-adaptive
, we have measured the data more efficiently than the naive sequential plan, spending more time on the samples that need extra attention!
Bonus Agents#
On your own try out these agents that are at the extremes of knowing exactly what to do and knowing nothing about what it should do.
Any callable that matches the signature
def recommend(sample_number, badness)->
Agent 3: “Cheating” (Omniscient) Agent#
Just in case you want to try another agent, here is one who we have whispered the answer to ahead of time.
fig, axes = plt.subplots(3, 3, constrained_layout=True, figsize=(5, 5))
callback = stream_to_figures(fig, axes)
from utils.adaptive_recommendations import CheatingAgent
unique_ids = RE(
with_agent(CheatingAgent(9), max_shots=90),
callback,
)
Note the RL agent probably performed just as well as the agent which had the right answer, because it was train ahead of time how to “play the game” we defined through our scientific goals.
Agent 4: “Chaos” (random) Agent#
Given this API we can plug in any logic we want to the plan, but that does not mean that the agent has to be good at its job! For example we can write an agent that will randomly pick a sample to measure next:
import random
def chaos_agent(x, y):
# ignores input!
return random.choice(range(9))
fig, axes = plt.subplots(3, 3, constrained_layout=True, figsize=(5, 5))
callback = stream_to_figures(fig, axes)
unique_ids = RE(
with_agent(chaos_agent, max_shots=30),
callback,
)
You should see that measurements jump around between the samples. Given that the samples are also randomly strong or weak this is not actually better or worse than a sequential scan in terms of over / under collecting on the samples, but if we take into account the time to move the motor maybe significantly worse!